/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package transientstore

import (
	"errors"

	"github.com/golang/protobuf/proto"
	"github.com/hyperledger/fabric/protos/ledger/rwset"

	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
	"github.com/hyperledger/fabric/core/ledger"
	"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

var emptyValue = []byte{}

// ErrStoreEmpty is used to indicate that there are no entries in transient store
var ErrStoreEmpty = errors.New("Transient store is empty")

//////////////////////////////////////////////
// Interfaces and data types
/////////////////////////////////////////////

// StoreProvider provides an instance of a TransientStore
type StoreProvider interface {
	OpenStore(ledgerID string) (Store, error)
	Close()
}

// Store manages the storage of private read-write sets for a ledgerId.
// Ideally, a ledger can remove the data from this storage when it is committed to
// the permanent storage or the pruning of some data items is enforced by the policy
type Store interface {
	// Persist stores the private read-write set of a transaction in the transient store
	Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
	// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
	// RWSets persisted from different endorsers (via Gossip)
	GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error)
	// GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation
	// performed by the peer itself
	GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error)
	// Purge removes private read-writes set generated by endorsers at block height lesser than
	// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
	// that were generated at block height of maxBlockNumToRetain or higher.
	Purge(maxBlockNumToRetain uint64) error
	// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
	GetMinEndorsementBlkHt() (uint64, error)
	Shutdown()
}

// EndorserPvtSimulationResults captures the deatils of the simulation results specific to an endorser
type EndorserPvtSimulationResults struct {
	EndorserID             string
	EndorsementBlockHeight uint64
	PvtSimulationResults   *rwset.TxPvtReadWriteSet
}

//////////////////////////////////////////////
// Implementation
/////////////////////////////////////////////

// storeProvider encapsulates a leveldb provider which is used to store
// private read-write set of simulated transactions, and implements TransientStoreProvider
// interface.
type storeProvider struct {
	dbProvider *leveldbhelper.Provider
}

// store holds an instance of a levelDB.
type store struct {
	db       *leveldbhelper.DBHandle
	ledgerID string
}

type rwsetScanner struct {
	txid   string
	dbItr  iterator.Iterator
	filter ledger.PvtNsCollFilter
}

// NewStoreProvider instantiates TransientStoreProvider
func NewStoreProvider() StoreProvider {
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: GetTransientStorePath()})
	return &storeProvider{dbProvider: dbProvider}
}

// OpenStore returns a handle to a ledgerId in Store
func (provider *storeProvider) OpenStore(ledgerID string) (Store, error) {
	dbHandle := provider.dbProvider.GetDBHandle(ledgerID)
	return &store{db: dbHandle, ledgerID: ledgerID}, nil
}

// Close closes the TransientStoreProvider
func (provider *storeProvider) Close() {
	provider.dbProvider.Close()
}

// Persist stores the private read-write set of a transaction in the transient store
func (s *store) Persist(txid string, endorserid string,
	endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error {
	dbBatch := leveldbhelper.NewUpdateBatch()

	// Create compositeKey with appropriate prefix, txid, endorserid and endorsementBlkHt
	compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
	privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults)
	if err != nil {
		return err
	}
	dbBatch.Put(compositeKey, privateSimulationResultsBytes)

	// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, endorserid & Store
	// the compositeKey (purge index) a null byte as value.
	compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, endorserid)
	dbBatch.Put(compositeKey, emptyValue)

	return s.db.WriteBatch(dbBatch, true)
}

// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorserids. Eventually, we may pass a set of collections name
// (filters) along with txid.
func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error) {
	// Construct startKey and endKey to do an range query
	startKey := createTxidRangeStartKey(txid)
	endKey := createTxidRangeEndKey(txid)

	iter := s.db.GetIterator(startKey, endKey)
	return &rwsetScanner{txid, iter, filter}, nil
}

// GetSelfSimulatedTxPvtRWSetByTxid returns the private write set generated from the simulation performed
// by the peer itself. Eventually, we may pass a set of collections name (filters) along with txid.
func (s *store) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) {
	var err error
	var itr *rwsetScanner
	if itr, err = s.GetTxPvtRWSetByTxid(txid, nil); err != nil {
		return nil, err
	}
	defer itr.Close()
	var res *EndorserPvtSimulationResults
	for {
		if res, err = itr.Next(); res == nil || err != nil {
			return nil, err
		}
		if selfSimulated(res) {
			return res, nil
		}
	}
}

// Purge removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
func (s *store) Purge(maxBlockNumToRetain uint64) error {
	// Do a range query with 0 as startKey and maxBlockNumToRetain-1 as endKey
	startKey := createEndorsementBlkHtRangeStartKey(0)
	endKey := createEndorsementBlkHtRangeEndKey(maxBlockNumToRetain - 1)
	iter := s.db.GetIterator(startKey, endKey)

	dbBatch := leveldbhelper.NewUpdateBatch()

	// Get all txid and endorserid from above result and remove it from transient store (both
	// read/write set and the corresponding index.
	for iter.Next() {
		dbKey := iter.Key()
		txid, endorserid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
		compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
		dbBatch.Delete(compositeKey)
		dbBatch.Delete(dbKey)
	}
	return s.db.WriteBatch(dbBatch, true)
}

// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
func (s *store) GetMinEndorsementBlkHt() (uint64, error) {
	// Current approach performs a range query on purgeIndex with startKey
	// as 0 (i.e., endorsementBlkHt) and returns the first key which denotes
	// the lowest retained endorsement block height. An alternative approach
	// is to explicitly store the minEndorsementBlkHt in the transientStore.
	startKey := createEndorsementBlkHtRangeStartKey(0)
	iter := s.db.GetIterator(startKey, nil)
	// Fetch the minimum endorsement block height
	if iter.Next() {
		dbKey := iter.Key()
		_, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
		return endorsementBlkHt, nil
	}
	// Returning an error may not be the right thing to do here. May be
	// return a bool. -1 is not possible due to unsigned int as first
	// return value
	return 0, ErrStoreEmpty
}

func (s *store) Shutdown() {
	// do nothing because shared db is used
}

// Next moves the iterator to the next key/value pair.
// It returns whether the iterator is exhausted.
func (scanner *rwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
	if !scanner.dbItr.Next() {
		return nil, nil
	}
	dbKey := scanner.dbItr.Key()
	dbVal := scanner.dbItr.Value()
	endorserid, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)

	txPvtRWSet := &rwset.TxPvtReadWriteSet{}
	if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {
		return nil, err
	}
	filteredTxPvtRWSet := pvtdatastorage.TrimPvtWSet(txPvtRWSet, scanner.filter)

	return &EndorserPvtSimulationResults{
		EndorserID:             endorserid,
		EndorsementBlockHeight: endorsementBlkHt,
		PvtSimulationResults:   filteredTxPvtRWSet,
	}, nil
}

// Close releases resource held by the iterator
func (scanner *rwsetScanner) Close() {
	scanner.dbItr.Release()
}

func selfSimulated(pvtSimRes *EndorserPvtSimulationResults) bool {
	return pvtSimRes.EndorserID == ""
}
